Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add custom fields to events #1228

Merged
merged 6 commits into from
Mar 21, 2025

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 7, 2025

p_user_agent - fetch user_agent from request header
p_src_ip - fetch source ip from connection info from request header

user can add additional headers to the ingest apis in the below format x-p-<key-name>: <value>
e.g. x-p-environment:dev

server adds environment in the events with the value dev
user can add multiple custom headers to be inserted as separate fields in the event

Summary by CodeRabbit

  • New Features

    • Extended event processing and log ingestion to support customizable fields.
    • Now extracts additional header information, including user agent and source IP, for enhanced logging.
  • Refactor

    • Streamlined schema handling by integrating custom fields directly into records.
    • Improved header access by standardizing key references for better consistency.
    • Added a new function for managing custom fields in log processing.

Copy link

coderabbitai bot commented Mar 7, 2025

Walkthrough

This pull request extends the event processing functionality by introducing a new parameter for custom fields across various modules. The changes update the method signatures for event formatting, ingestion, and log processing functions to accept an additional HashMap of custom fields. Furthermore, constants for user agent and source IP tracking are added, and helper functions are introduced to extract these fields from HTTP requests and integrate them into record batches.

Changes

File(s) Change Summary
src/event/format/json.rs, src/event/format/mod.rs Added new parameter p_custom_fields to the into_event and into_recordbatch methods of the EventFormat trait. Removed the manual timestamp field insertion in favor of utilizing add_parseable_fields for schema creation.
src/event/mod.rs Introduced new constant strings: USER_AGENT_KEY, SOURCE_IP_KEY, and FORMAT_KEY for identifying user agent and source IP information.
src/handlers/http/audit.rs, src/handlers/http/ingest.rs Updated the audit middleware to use the USER_AGENT constant instead of a literal string. Modified several ingestion endpoints to support custom fields extraction and updated their return types to Result<HttpResponse, PostError>.
src/handlers/http/modal/utils/ingest_utils.rs Added the function get_custom_fields_from_header to extract custom fields from the HTTP request headers. Updated the function signatures of flatten_and_push_logs and push_logs to include the new custom fields parameter.
src/utils/arrow/mod.rs Added the public function add_parseable_fields to modify a record batch by prepending a timestamp field and custom fields into the schema. Updated necessary imports to support the creation of string arrays and fields for the custom data.
src/connectors/kafka/processor.rs Introduced a new variable p_custom_fields, initialized as an empty map and populated with the USER_AGENT_KEY, which is passed to the into_event method when creating a new event.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant IngestHandler as Ingest Handler
    participant Utils as Ingest Utils
    participant Event as EventFormat
    participant Arrow as Arrow Utils

    Client->>IngestHandler: HTTP Request (with headers)
    IngestHandler->>Utils: get_custom_fields_from_header(req)
    Utils-->>IngestHandler: custom fields
    IngestHandler->>Event: call into_event(..., custom_fields)
    Event->>Event: call into_recordbatch(..., custom_fields)
    Event->>Arrow: call add_parseable_fields(rb, timestamp, custom_fields)
    Arrow-->>Event: modified RecordBatch
    Event-->>IngestHandler: processed event
    IngestHandler-->>Client: HttpResponse
Loading

Possibly related PRs

Suggested reviewers

  • de-sh

Poem

I hopped through lines of code today,
Adding custom fields along the way.
Timestamps and headers now dance in line,
Each log and record perfectly align.
With bytes and hops, my day is bright 🐰,
Celebrating changes that feel just right!
For every sprint, I leap with delight.

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/utils/arrow/mod.rs (1)

144-183: Well-implemented function for adding custom fields to record batches

This function is well-structured and handles the addition of custom fields properly:

  • Correctly preserves the original record batch data
  • Inserts timestamp at the beginning
  • Sorts custom fields by key for consistent ordering
  • Handles field type definitions appropriately

One minor optimization suggestion would be to pre-allocate the vectors with the correct capacity.

You could optimize vector allocations by pre-calculating the final size:

-    let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
-    let mut columns: Vec<ArrayRef> = rb.columns().to_vec();
+    let final_size = schema.fields().len() + 1 + p_custom_fields.len();
+    let mut fields: Vec<Field> = Vec::with_capacity(final_size);
+    let mut columns: Vec<ArrayRef> = Vec::with_capacity(final_size);
+    
+    // Add existing fields and columns
+    fields.extend(schema.fields().iter().map(|f| f.as_ref().clone()));
+    columns.extend(rb.columns().to_vec());
src/handlers/http/modal/utils/ingest_utils.rs (1)

163-168: Consider adding validation for custom header keys.

While the implementation correctly extracts headers with the x-p- prefix, consider adding validation to ensure the trimmed keys follow any specific naming restrictions your schema might have.

 if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
     if let Ok(value) = header_value.to_str() {
         let key = header_name.trim_start_matches("x-p-");
+        // Validate key format - alphanumeric and underscore only
+        if key.chars().all(|c| c.is_alphanumeric() || c == '_') {
             p_custom_fields.insert(key.to_string(), value.to_string());
+        }
     }
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 10aef7b and bc70091.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (19 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (22)
src/event/mod.rs (1)

38-39: New constants for custom field keys

The addition of these constants establishes standardized keys for user agent and source IP tracking in event data. This enhances code consistency across the application.

src/handlers/http/audit.rs (2)

26-26: Improved header import for HTTP standards

Adding the import for the standard HTTP header constant improves code maintainability.


89-89: Using standard HTTP header constant instead of string literal

Good change - replacing the string literal with the standard HTTP header constant improves code maintainability and reduces the risk of typos.

src/utils/arrow/mod.rs (1)

43-61: Updated imports to support custom fields functionality

The import changes properly support the new functionality for handling custom fields in record batches.

src/event/format/json.rs (2)

151-151: Added parameter for custom fields

The interface has been extended to support custom fields in events, which aligns with the PR objective.


171-171: Passing custom fields to record batch creation

Successfully integrated the custom fields parameter into the existing workflow.

src/handlers/http/modal/utils/ingest_utils.rs (5)

19-20: Good addition of HashMap import for custom fields functionality.

This import is necessary for the new functionality to handle custom fields.


31-31: Good usage of constants for special field keys.

Using constants for special field keys like SOURCE_IP_KEY and USER_AGENT_KEY improves code maintainability.


46-46: Good use of constants for ignored headers.

Defining which headers to ignore is a good practice to maintain consistency and avoid redundant fields.


52-52: Appropriate signature update to accept custom fields.

The function signature has been correctly updated to accept custom fields as a parameter.


146-171: Well-implemented custom fields extraction function.

The implementation is robust, handling:

  1. User agent extraction with fallback to default
  2. Source IP retrieval with default handling
  3. Custom header extraction with proper prefix filtering

It properly follows the PR's requirement of handling x-p-<key-name> headers.

src/handlers/http/ingest.rs (8)

41-41: Good import update to include the new custom fields function.

The imports have been correctly updated to include the new get_custom_fields_from_header function.


75-77: Correctly implemented custom fields extraction in the ingest handler.

The code properly extracts custom fields from the request and passes them to the flatten_and_push_logs function.


98-98: Empty HashMap provided for internal stream ingestion.

For internal streams, an empty HashMap is provided which is appropriate since internal streams don't need custom fields from HTTP headers.


128-130: Consistent implementation of custom fields across handlers.

The custom fields extraction and usage is consistently implemented across all handlers.


162-164: Custom fields properly implemented in metrics ingestion handler.

The implementation matches the pattern established in other handlers, maintaining consistency.


192-194: Custom fields properly implemented in traces ingestion handler.

The implementation follows the same pattern as other handlers, ensuring consistency.


243-244: Custom fields implementation in post_event handler.

The implementation correctly extracts and uses custom fields in the post_event handler.


395-400: Test case updated with custom fields parameter.

The test has been properly updated to include the new required parameter, using an empty HashMap.

src/event/format/mod.rs (3)

118-118: Correct trait signature update for custom fields.

The EventFormat trait has been properly updated to accept custom fields.


139-142: Improved implementation using add_parseable_fields.

The code now uses the add_parseable_fields utility function to add both timestamp and custom fields to the record batch. This is a cleaner approach than the previous implementation.


179-179: Consistent interface update for into_event method.

The into_event method signature has been updated to match the changes in into_recordbatch, maintaining consistency.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 7, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🔭 Outside diff range comments (1)
src/utils/arrow/mod.rs (1)

204-270: 💡 Verification agent

❓ Verification inconclusive

Missing tests for the new add_parseable_fields function.

While there are tests for other functions in this module, there are no tests specifically for the newly added add_parseable_fields function. Consider adding tests that verify:

  1. Custom fields are correctly added to the RecordBatch
  2. The timestamp is correctly inserted
  3. Error cases are properly handled
  4. Behavior when the input RecordBatch is empty
  5. Behavior with empty custom fields

🏁 Script executed:

#!/bin/bash
# Check if there are any tests for the add_parseable_fields function in the codebase
rg -l "test.*add_parseable_fields" --type rust

Length of output: 48


Actionable: Add tests for the new add_parseable_fields function

After verifying the repository, there are no tests referencing add_parseable_fields. Please add tests covering the following aspects:

  • Custom fields are correctly added to the RecordBatch.
  • The timestamp is correctly inserted.
  • Error cases are properly handled.
  • Behavior when the input RecordBatch is empty.
  • Behavior with empty custom fields.
🧹 Nitpick comments (1)
src/utils/arrow/mod.rs (1)

144-192: The implementation correctly adds custom fields to RecordBatches.

The add_parseable_fields function properly handles the addition of a timestamp and custom fields to the RecordBatch. The sorting of keys ensures consistent field order, and the implementation correctly maintains the Arrow memory model with appropriate use of Arc.

Consider these improvements:

  1. Add tests for this new function to verify behavior with various inputs
  2. Check if a timestamp field already exists in the schema before inserting it
  3. Add documentation for possible errors that could be returned
 pub fn add_parseable_fields(
     rb: RecordBatch,
     p_timestamp: DateTime<Utc>,
     p_custom_fields: &HashMap<String, String>,
 ) -> Result<RecordBatch, ArrowError> {
     // Return Result for proper error handling
+    // Possible errors:
+    // - Schema mismatch if fields cannot be combined
+    // - Memory allocation errors for large batches

     // Add custom fields in sorted order
     let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
     sorted_keys.sort();

     let schema = rb.schema();
     let row_count = rb.num_rows();

+    // Check if timestamp field already exists
+    let timestamp_field_exists = schema.fields().iter().any(|f| f.name() == DEFAULT_TIMESTAMP_KEY);
+
     let mut fields = schema
         .fields()
         .iter()
         .map(|f| f.as_ref().clone())
         .collect_vec();
-    fields.insert(
-        0,
-        Field::new(
-            DEFAULT_TIMESTAMP_KEY,
-            DataType::Timestamp(TimeUnit::Millisecond, None),
-            true,
-        ),
-    );
+    if !timestamp_field_exists {
+        fields.insert(
+            0,
+            Field::new(
+                DEFAULT_TIMESTAMP_KEY,
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                true,
+            ),
+        );
+    }
     fields.extend(
         sorted_keys
             .iter()
             .map(|k| Field::new(*k, DataType::Utf8, true)),
     );

     let mut columns = rb.columns().iter().map(Arc::clone).collect_vec();
-    columns.insert(
-        0,
-        Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
-    );
+    if !timestamp_field_exists {
+        columns.insert(
+            0,
+            Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
+        );
+    }
     columns.extend(sorted_keys.iter().map(|k| {
         let value = p_custom_fields.get(*k).unwrap();
         Arc::new(StringArray::from_iter_values(
             std::iter::repeat(value).take(row_count),
         )) as ArrayRef
     }));

     // Create the new schema and batch
     let new_schema = Arc::new(Schema::new(fields));
     RecordBatch::try_new(new_schema, columns)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bc70091 and 5f746eb.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (19 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/event/mod.rs
  • src/handlers/http/audit.rs
  • src/handlers/http/modal/utils/ingest_utils.rs
  • src/event/format/mod.rs
  • src/event/format/json.rs
  • src/handlers/http/ingest.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (1)
src/utils/arrow/mod.rs (1)

43-48: Imports are appropriately updated for the new functionality.

The updated imports correctly include the necessary types for handling custom fields: HashMap for storing key-value pairs, and Arrow types like StringArray, ArrowError, DataType, Field, and TimeUnit for creating and manipulating Arrow data structures.

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 13, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)

146-156: Good implementation for extracting HTTP headers.

The function correctly extracts the User-Agent and source IP from the request. Consider adding error logging for cases where these values cannot be extracted.

 pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {
     let user_agent = req
         .headers()
         .get("User-Agent")
         .and_then(|a| a.to_str().ok())
-        .unwrap_or_default();
+        .unwrap_or_else(|| {
+            log::debug!("User-Agent header not found or not valid UTF-8");
+            ""
+        });

     let conn = req.connection_info().clone();

-    let source_ip = conn.realip_remote_addr().unwrap_or_default();
+    let source_ip = conn.realip_remote_addr().unwrap_or_else(|| {
+        log::debug!("Source IP not found in connection info");
+        ""
+    });

157-158: Consider conditional inclusion of empty values.

The current implementation always adds User-Agent and source IP fields, even when they're empty. You might want to add them only when they contain non-empty values.

-    p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
-    p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());
+    if !user_agent.is_empty() {
+        p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
+    }
+    if !source_ip.is_empty() {
+        p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());
+    }

161-169: Check for duplicate header names.

If multiple headers with the same name (after stripping "x-p-") are present, the last one will overwrite previous values. Consider logging a warning when this happens, or implement a strategy to handle duplicates (e.g., comma-separated values).

             if let Ok(value) = header_value.to_str() {
                 let key = header_name.trim_start_matches("x-p-");
-                p_custom_fields.insert(key.to_string(), value.to_string());
+                if p_custom_fields.contains_key(key) {
+                    log::warn!("Duplicate custom field key '{}', overwriting previous value", key);
+                }
+                p_custom_fields.insert(key.to_string(), value.to_string());
             }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f746eb and 78bc79c.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (20 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/event/mod.rs
  • src/handlers/http/audit.rs
  • src/utils/arrow/mod.rs
  • src/event/format/json.rs
  • src/handlers/http/ingest.rs
  • src/event/format/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (6)
src/handlers/http/modal/utils/ingest_utils.rs (6)

19-21: Good addition of required imports.

The new imports for HashMap and HttpRequest are appropriately added to support the custom fields feature.


46-46: Well-defined constant for header exclusions.

Good practice to define a constant array for headers that should be ignored when processing custom headers.


48-53: Function signature properly updated.

The function signature has been correctly updated to accept the new p_custom_fields parameter, aligning with the PR objective of supporting custom fields.


59-59: Consistent propagation of custom fields.

All calls to push_logs have been correctly updated to pass the p_custom_fields parameter, ensuring consistent behavior across different log sources.

Also applies to: 66-67, 73-74, 80-81, 84-85


90-95: Function signature properly updated for push_logs.

The push_logs function signature has been correctly updated to accept custom fields from various log sources.


139-139: Custom fields correctly passed to event processor.

The custom fields are properly passed to the into_event method, ensuring they are included in the event data.

Copy link
Contributor

@de-sh de-sh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if we had tests to validate behavior of add_parseable_fields

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/utils/arrow/mod.rs (2)

146-194: Consider adding tests for this new function

Given the importance of this function for the custom fields feature, it would be beneficial to add unit tests that verify:

  • Custom fields are properly added
  • Fields appear in the expected order
  • The row count is preserved
  • Edge cases (empty HashMap, large number of fields, etc.)

185-186: Potential unwrap() risk

The unwrap() call on line 185 could potentially panic if the key doesn't exist in the HashMap. While this appears safe since the keys are directly derived from the HashMap itself in the loop, consider using .expect() with a descriptive message for better error reporting in case of future modifications.

-        let value = p_custom_fields.get(*k).unwrap();
+        let value = p_custom_fields.get(*k).expect("Key must exist in HashMap");
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 78bc79c and fdcf9dd.

📒 Files selected for processing (2)
  • src/event/format/mod.rs (4 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/event/format/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/utils/arrow/mod.rs (2)

146-194: Implementation looks solid with good error handling

The new add_parseable_fields function correctly:

  • Sorts keys for deterministic column order
  • Handles schema and data modifications properly
  • Returns Result for error propagation

The implementation will efficiently add both the timestamp field and custom fields to record batches.


43-48: Imports organized well for the new functionality

The additional imports are correctly organized to support the new functionality for handling custom fields and working with Arrow data types.

p_user_agent - fetch user_agent from request header
p_src_ip - fetch source ip from connection info from request header

user can add additional headers to the ingest apis in the below format
`x-p-<key-name>: <value>`
e.g. x-p-environment:dev

server adds `environment` in the events with the value `dev`
user can add multiple custom headers to be inserted as separate fields in the event
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)

147-169: 🛠️ Refactor suggestion

Add validation for empty keys after stripping prefix.

The function doesn't validate if a key is empty after trimming the "x-p-" prefix. If a header with name "x-p-" (with nothing after) is provided, it would result in an empty key in the HashMap.

        if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
            if let Ok(value) = header_value.to_str() {
                let key = header_name.trim_start_matches("x-p-");
-               p_custom_fields.insert(key.to_string(), value.to_string());
+               if !key.is_empty() {
+                   p_custom_fields.insert(key.to_string(), value.to_string());
+               } else {
+                   log::warn!("Ignoring header with empty key after prefix: {}", header_name);
+               }
            }
        }

147-179: 🛠️ Refactor suggestion

Implement sanitization for custom field values.

Header values are inserted directly into the custom fields HashMap without any sanitization. This could potentially lead to injection attacks or issues with malformed data affecting downstream processing.

Consider adding a sanitization function for the header values before insertion:

// Add this utility function
+ fn sanitize_header_value(value: &str) -> String {
+     // Implement appropriate sanitization logic
+     // This could include escaping special characters,
+     // removing control characters, limiting length, etc.
+     value.chars()
+         .filter(|&c| !c.is_control())
+         .take(1000)  // Limit size
+         .collect()
+ }

// Then use it when inserting values
-    p_custom_fields.insert(USER_AGENT_KEY.to_string(), user_agent.to_string());
+    p_custom_fields.insert(USER_AGENT_KEY.to_string(), sanitize_header_value(user_agent));

-    p_custom_fields.insert(SOURCE_IP_KEY.to_string(), source_ip.to_string());
+    p_custom_fields.insert(SOURCE_IP_KEY.to_string(), sanitize_header_value(source_ip));

// And in the header processing loop
-                p_custom_fields.insert(key.to_string(), value.to_string());
+                p_custom_fields.insert(key.to_string(), sanitize_header_value(value));

// And for the format key
-                p_custom_fields.insert(FORMAT_KEY.to_string(), value.to_string());
+                p_custom_fields.insert(FORMAT_KEY.to_string(), sanitize_header_value(value));
🧹 Nitpick comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)

164-168: Consider case-insensitive header name matching.

HTTP headers are case-insensitive by specification, but the check header_name.starts_with("x-p-") is case-sensitive. This could cause issues if clients send headers with different casing (e.g., "X-P-Environment").

-        if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
+        let header_name_lower = header_name.to_lowercase();
+        if header_name_lower.starts_with("x-p-") && !IGNORE_HEADERS.iter().any(|&h| h.to_lowercase() == header_name_lower) {
             if let Ok(value) = header_value.to_str() {
-                let key = header_name.trim_start_matches("x-p-");
+                let key = header_name_lower.trim_start_matches("x-p-");
                 p_custom_fields.insert(key.to_string(), value.to_string());
             }
         }

181-224: Tests are well-structured but could be more comprehensive.

The tests cover basic functionality but could be expanded to include edge cases:

  • Test with malformed header values
  • Test with empty keys after stripping "x-p-"
  • Test with case variations (e.g., X-P-Environment vs x-p-environment)
  • Test with duplicate keys (standard + custom headers mapping to same key)

Consider adding the following test case:

#[test]
fn test_get_custom_fields_from_header_with_case_insensitive_headers() {
    let req = TestRequest::default()
        .insert_header((USER_AGENT, "TestUserAgent"))
        .insert_header(("X-P-Environment", "dev"))  // Uppercase X-P
        .to_http_request();

    let custom_fields = get_custom_fields_from_header(req);

    assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
    assert_eq!(custom_fields.get("environment").unwrap(), "dev");
}

#[test]
fn test_get_custom_fields_from_header_with_empty_key() {
    let req = TestRequest::default()
        .insert_header((USER_AGENT, "TestUserAgent"))
        .insert_header(("x-p-", "value"))  // Empty key after prefix
        .to_http_request();

    let custom_fields = get_custom_fields_from_header(req);

    assert_eq!(custom_fields.get(USER_AGENT_KEY).unwrap(), "TestUserAgent");
    assert!(!custom_fields.contains_key(""));  // Empty key should be ignored
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fdcf9dd and ae7deb5.

📒 Files selected for processing (7)
  • src/event/format/json.rs (2 hunks)
  • src/event/format/mod.rs (4 hunks)
  • src/event/mod.rs (1 hunks)
  • src/handlers/http/audit.rs (2 hunks)
  • src/handlers/http/ingest.rs (20 hunks)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • src/event/mod.rs
  • src/handlers/http/audit.rs
  • src/event/format/json.rs
  • src/event/format/mod.rs
  • src/utils/arrow/mod.rs
  • src/handlers/http/ingest.rs
🧰 Additional context used
🧬 Code Definitions (1)
src/handlers/http/modal/utils/ingest_utils.rs (4)
src/handlers/http/ingest.rs (1) (1)
  • ingest (51-89)
src/handlers/http/kinesis.rs (1) (1)
  • flatten_kinesis_logs (60-85)
src/otel/logs.rs (1) (1)
  • flatten_otel_logs (140-170)
src/otel/traces.rs (1) (1)
  • flatten_otel_traces (89-124)
🔇 Additional comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)

19-47: Added imports and constants look good.

The additional imports and constants appropriately support the new custom fields functionality. Using a constant array IGNORE_HEADERS to exclude specific headers is a clean approach.


49-89: Function signature update is correctly implemented.

The flatten_and_push_logs function has been properly updated to accept the new p_custom_fields parameter, and all calls to push_logs within it correctly pass this parameter.


91-145: Function signature and implementation updates are correct.

The push_logs function signature has been properly updated, and the custom fields are correctly passed to the into_event method at line 140.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ae7deb5 and 23564e8.

📒 Files selected for processing (1)
  • src/connectors/kafka/processor.rs (3 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/connectors/kafka/processor.rs (4)
src/event/format/mod.rs (1) (1)
  • new (116-121)
src/metadata.rs (1) (1)
  • new (95-130)
src/event/format/json.rs (1) (1)
  • new (42-47)
src/connectors/kafka/sink.rs (1) (1)
  • new (43-62)
🪛 GitHub Actions: Lint, Test and Coverage Report
src/connectors/kafka/processor.rs

[error] 79-79: failed to resolve: use of undeclared type HashMap.

⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (4)
src/connectors/kafka/processor.rs (4)

32-32: Import USER_AGENT_KEY constant for custom fields

The import of USER_AGENT_KEY aligns with the PR objective to add custom fields to events, specifically the user agent information.


79-81: Custom fields implementation for Kafka events

This code appropriately initializes the custom fields map and adds the Kafka user agent, matching the PR's goal to enhance events with custom fields. Setting "kafka" as the user agent value helps with tracking the source of events.

🧰 Tools
🪛 GitHub Actions: Lint, Test and Coverage Report

[error] 79-79: failed to resolve: use of undeclared type HashMap.


91-91: Added custom fields parameter to into_event

The parameter addition correctly integrates with the method signature change to support custom fields in events, fulfilling the PR objective.


43-95:

❓ Verification inconclusive

Verify method signature in json::Event::into_event

The updated call to into_event() adds a new parameter, but I don't see the related method signature change in the provided code. Ensure that the into_event method in json::Event has been updated to accept this new parameter.


🏁 Script executed:

#!/bin/bash
# Verify that the into_event method signature has been updated
grep -r "fn into_event" --include="*.rs" src/event/

Length of output: 140


Heads-up: Verify Updated into_event Signature

The new call to into_event() now passes nine parameters. However, the grep results only confirm the method's existence in both src/event/format/json.rs and src/event/format/mod.rs. Please double-check that the declarations and implementations (especially the one used by ParseableSinkProcessor in src/connectors/kafka/processor.rs) have been updated to accept the new parameter. In particular, ensure that:

  • The implementation in src/event/format/json.rs includes the new parameter.
  • The corresponding declaration (or trait signature) in src/event/format/mod.rs has been updated to match.
🧰 Tools
🪛 GitHub Actions: Lint, Test and Coverage Report

[error] 79-79: failed to resolve: use of undeclared type HashMap.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/utils/arrow/mod.rs (1)

184-195: Consider adding type detection for custom field values

Currently, all custom fields are added as UTF8 string types. For improved data analytics, you might want to consider detecting and converting values to appropriate Arrow types (numbers, booleans, etc.) when possible.

 for key in sorted_keys {
     if !field_names.contains(key) {
-        fields.push(Field::new(key, DataType::Utf8, true));
+        // Try to parse the value into more specific types
+        let value = p_custom_fields.get(key).unwrap();
+        let data_type = if let Ok(_) = value.parse::<i64>() {
+            DataType::Int64
+        } else if let Ok(_) = value.parse::<f64>() {
+            DataType::Float64
+        } else if value == "true" || value == "false" {
+            DataType::Boolean
+        } else {
+            DataType::Utf8
+        };
+        fields.push(Field::new(key, data_type, true));
         field_names.insert(key.to_string());

         let value = p_custom_fields.get(key).unwrap();
-        columns.push(Arc::new(StringArray::from_iter_values(
-            std::iter::repeat(value).take(row_count),
-        )) as ArrayRef);
+        // Create appropriate array based on detected type
+        let array: ArrayRef = if let DataType::Int64 = data_type {
+            Arc::new(arrow_array::Int64Array::from_iter_values(
+                std::iter::repeat(value.parse::<i64>().unwrap()).take(row_count)
+            ))
+        } else if let DataType::Float64 = data_type {
+            Arc::new(arrow_array::Float64Array::from_iter_values(
+                std::iter::repeat(value.parse::<f64>().unwrap()).take(row_count)
+            ))
+        } else if let DataType::Boolean = data_type {
+            Arc::new(arrow_array::BooleanArray::from_iter_values(
+                std::iter::repeat(value == "true").take(row_count)
+            ))
+        } else {
+            Arc::new(StringArray::from_iter_values(
+                std::iter::repeat(value).take(row_count)
+            ))
+        };
+        columns.push(array);
     }
 }
src/handlers/http/modal/utils/ingest_utils.rs (1)

164-175: Implement field name validation for Arrow schema compatibility

While you've correctly handled empty keys after trimming the "x-p-" prefix, there's no validation that the resulting key names are valid for Arrow schema fields. Consider adding validation to ensure key names don't contain characters that might be problematic in Arrow field names.

 if header_name.starts_with("x-p-") && !IGNORE_HEADERS.contains(&header_name) {
     if let Ok(value) = header_value.to_str() {
         let key = header_name.trim_start_matches("x-p-");
         if !key.is_empty() {
-            p_custom_fields.insert(key.to_string(), value.to_string());
+            // Validate that the key name is valid for Arrow schema
+            if is_valid_arrow_field_name(key) {
+                p_custom_fields.insert(key.to_string(), value.to_string());
+            } else {
+                warn!(
+                    "Ignoring header with invalid Arrow field name: {}",
+                    header_name
+                );
+            }
         } else {
             warn!(
                 "Ignoring header with empty key after prefix: {}",
                 header_name
             );
         }
     }
 }

 // Add a helper function to validate field names
+fn is_valid_arrow_field_name(name: &str) -> bool {
+    // Implement validation logic based on Arrow's requirements
+    // For example, disallow spaces and special characters
+    !name.contains(|c: char| c.is_whitespace() || ".,;:!@#$%^&*()+=[]{}|\\<>/".contains(c))
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 48b90ff and d547ef5.

📒 Files selected for processing (2)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
  • src/utils/arrow/mod.rs (3 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
src/handlers/http/modal/utils/ingest_utils.rs (4)
src/handlers/http/ingest.rs (1) (1)
  • ingest (51-89)
src/handlers/http/kinesis.rs (1) (1)
  • flatten_kinesis_logs (60-85)
src/otel/logs.rs (1) (1)
  • flatten_otel_logs (140-170)
src/otel/traces.rs (1) (1)
  • flatten_otel_traces (89-124)
src/utils/arrow/mod.rs (3)
src/metadata.rs (2) (2)
  • schema (146-151)
  • new (95-130)
src/event/format/mod.rs (1) (1)
  • new (116-121)
src/event/format/json.rs (1) (1)
  • new (42-47)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (4)
src/utils/arrow/mod.rs (1)

149-200: Well-implemented custom fields functionality

The add_parseable_fields function provides a clean way to extend a RecordBatch with custom fields from a HashMap. The implementation properly handles duplicate fields by checking against existing field names before insertion, and sorts keys for deterministic output order.

src/handlers/http/modal/utils/ingest_utils.rs (3)

53-53: Good approach to passing custom fields through the pipeline

The addition of the p_custom_fields parameter to existing functions allows for flexible extension of log data with custom fields without disrupting the existing code flow.


147-186: Consider adding sanitization for custom field values

The custom field values extracted from headers are added directly without any validation or sanitization. Consider implementing sanitization for the header values to prevent potential injection attacks or malformed data.

#!/bin/bash
# Check if there's any validation/sanitization of header values elsewhere in the codebase
rg -A 3 -B 3 "header_value.to_str\(\)" --type rust | grep -v "get_custom_fields_from_header"

188-244: Comprehensive test coverage for the new functionality

The tests cover various scenarios for the get_custom_fields_from_header function, including custom fields, ignored headers, format key, and empty headers. This is excellent test coverage for the new functionality.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/handlers/http/modal/utils/ingest_utils.rs (2)

149-207: Well-implemented extraction of custom fields with proper validation.

The implementation correctly:

  • Extracts user agent and source IP information
  • Processes headers with "x-p-" prefix
  • Validates non-empty keys
  • Implements length limits with truncation
  • Handles special cases (LOG_SOURCE_KEY)

Consider adding a brief documentation comment explaining the function's purpose, expected header format, and how it handles the special case for LOG_SOURCE_KEY.

+/**
+ * Extracts custom fields from HTTP request headers.
+ * 
+ * This function:
+ * - Always includes User-Agent and source IP as default fields
+ * - Processes headers with "x-p-" prefix as custom fields
+ * - Handles LOG_SOURCE_KEY specially by adding it as a FORMAT_KEY field
+ * - Enforces limits on number and size of custom fields
+ */
 pub fn get_custom_fields_from_header(req: HttpRequest) -> HashMap<String, String> {

209-265: Good test coverage with various scenarios.

The tests cover key scenarios including:

  • Custom fields extraction
  • Ignored headers handling
  • Format key special case
  • Empty key validation

Consider adding two additional tests to verify limit handling:

  1. A test that verifies truncation when field values exceed MAX_FIELD_VALUE_LENGTH
  2. A test that verifies the maximum number of custom fields is enforced
#[test]
fn test_field_value_truncation() {
    let long_value = "a".repeat(MAX_FIELD_VALUE_LENGTH + 50);
    let req = TestRequest::default()
        .insert_header(("x-p-long", long_value.as_str()))
        .to_http_request();

    let custom_fields = get_custom_fields_from_header(req);
    
    assert_eq!(custom_fields.get("long").unwrap().len(), MAX_FIELD_VALUE_LENGTH);
}

#[test]
fn test_max_fields_limit() {
    let mut req_builder = TestRequest::default();
    
    // Add more than MAX_CUSTOM_FIELDS custom headers
    for i in 0..MAX_CUSTOM_FIELDS + 5 {
        req_builder = req_builder.insert_header((format!("x-p-field{}", i), "test"));
    }
    
    let req = req_builder.to_http_request();
    let custom_fields = get_custom_fields_from_header(req);
    
    // We expect MAX_CUSTOM_FIELDS (discounting the default fields that are always added)
    assert!(custom_fields.len() <= MAX_CUSTOM_FIELDS);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d547ef5 and 718c9b2.

📒 Files selected for processing (1)
  • src/handlers/http/modal/utils/ingest_utils.rs (3 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/handlers/http/modal/utils/ingest_utils.rs (4)
src/handlers/http/ingest.rs (1) (1)
  • ingest (51-89)
src/handlers/http/kinesis.rs (1) (1)
  • flatten_kinesis_logs (60-85)
src/otel/logs.rs (1) (1)
  • flatten_otel_logs (140-170)
src/otel/traces.rs (1) (1)
  • flatten_otel_traces (89-124)
⏰ Context from checks skipped due to timeout of 90000ms (5)
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/handlers/http/modal/utils/ingest_utils.rs (3)

47-49: LGTM! Constants for limits and ignored headers.

The implementation includes essential security measures with well-defined constants for:

  • List of headers to ignore
  • Maximum number of custom fields (10)
  • Maximum field value length (100 characters)

These limits help prevent potential DoS attacks from excessive custom headers.


51-56: Function signature update correctly propagates custom fields parameter.

The signature change appropriately adds the custom fields parameter to allow passing header-derived fields through the event processing pipeline.


62-87: All push_logs calls correctly include the custom fields parameter.

The parameter has been consistently passed to all call sites in different log source scenarios, ensuring custom fields are properly propagated regardless of the log source type.

@nitisht nitisht merged commit 6fe35a6 into parseablehq:main Mar 21, 2025
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants